/*
 * Wazuh Vulnerability scanner
 * Copyright (C) 2015, Wazuh Inc.
 * March 25, 2023.
 *
 * This program is free software; you can redistribute it
 * and/or modify it under the terms of the GNU General Public
 * License (version 2) as published by the FSF - Free Software
 * Foundation.
 */

#ifndef _DATABASE_FEED_MANAGER_HPP
#define _DATABASE_FEED_MANAGER_HPP

#include "../policyManager/policyManager.hpp"
#include "../scanOrchestrator/scanContext.hpp"
#include "HTTPRequest.hpp"
#include "UNIXSocketRequest.hpp"
#include "cacheLRU.hpp"
#include "contentManager.hpp"
#include "contentRegister.hpp"
#include "defer.hpp"
#include "eventDecoder.hpp"
#include "feedIndexer.hpp"
#include "globalData.hpp"
#include "indexerConnector.hpp"
#include "jsonArrayParser.hpp"
#include "loggerHelper.h"
#include "observer.hpp"
#include "rocksDBWrapper.hpp"
#include "routerSubscriber.hpp"
#include "scanDispatcher.hpp"
#include "storeModel.hpp"
#include "updateCVECandidates.hpp"
#include "updateCVEDescription.hpp"
#include "vulnerabilityCandidate_generated.h"
#include "vulnerabilityDescription_generated.h"
#include "vulnerabilityRemediations_generated.h"
#include "vulnerabilityScanner.hpp"
#include <external/nlohmann/json.hpp>
#include <fstream>
#include <functional>
#include <memory>
#include <string>
#include <vector>

using namespace NSVulnerabilityScanner;
constexpr auto DATABASE_PATH {"queue/vd/feed"};
constexpr auto OFFSET_TRANSACTION_SIZE {200};

/**
 * @brief A struct for storing a pair of FlatBuffers data.
 *
 * The `FlatbufferDataPair` struct is designed to store a pair of related FlatBuffers data:
 * a `rocksdb::PinnableSlice` containing the serialized data and a pointer to the deserialized
 * data of type `FlatbufferType`. This allows for efficient storage and access to both the raw
 * serialized data and its parsed form.
 *
 * @tparam FlatbufferType The type of the FlatBuffers object that this struct represents.
 */
template<typename FlatbufferType>
struct FlatbufferDataPair final
{
    /**
     * @brief A slice to the serialized FlatBuffers data.
     *
     * The `slice` member stores a `rocksdb::PinnableSlice` that contains the serialized
     * FlatBuffers data.
     */
    rocksdb::PinnableSlice slice;

    /**
     * @brief A pointer to the deserialized FlatBuffers data.
     *
     * The `data` member is a pointer to the deserialized FlatBuffers data of type `FlatbufferType`.
     * It provides direct access to the parsed information.
     */
    const FlatbufferType* data = nullptr;
};

/**
 * @brief DatabaseFeedManager class.
 *
 * @tparam TIndexerConnector Indexer connector type.
 * @tparam TPolicyManager Policy manager type.
 * @tparam TContentRegister Content register type.
 * @tparam TRouterSubscriber Router subscriber type.
 */
template<typename TIndexerConnector = IndexerConnector,
         typename TPolicyManager = PolicyManager,
         typename TContentRegister = ContentRegister,
         typename TRouterSubscriber = RouterSubscriber,
         typename TUNIXSocketRequest = UNIXSocketRequest,
         typename TRocksDBWrapper = Utils::RocksDBWrapper>
class TDatabaseFeedManager final : public Observer<nlohmann::json&>
{
public:
    /**
     * @brief Process and validated the message received by the router.
     *
     * @param message Message received by the router.
     * @param topicName Topic name.
     * @param orchestration Chain of actions to execute for each valid resource extracted from the message.
     */
    void processMessage(const std::vector<char>& message,
                        const std::string& topicName,
                        const std::function<void(const nlohmann::json&, Utils::IRocksDBWrapper*)>& orchestration)
    {
        auto parsedMessage = nlohmann::json::parse(message, nullptr, false);

        if (parsedMessage.is_discarded() || !parsedMessage.contains("paths") || !parsedMessage.contains("type") ||
            !parsedMessage.contains("offset"))
        {
            throw std::runtime_error("Invalid message");
        }

        if (parsedMessage.at("type") == "offsets")
        {
            auto jsonPointer {"/data"_json_pointer};
            for (const auto& path : parsedMessage.at("paths"))
            {
                std::lock_guard<std::shared_mutex> lock(m_mutex);

                auto feedTransaction = m_feedDatabase->createTransaction();
                auto currentOffset = 0LL;
                auto committedInLastIteration = true;
                logInfo(WM_VULNSCAN_LOGTAG, "Processing file: %s", path.get_ref<const std::string&>().c_str());

                // Parse the file and execute the chain/orchestration for each valid resource.
                // The lambda function returns false if the module is stopped.
                // This uses the json sax api, so it is faster than the json tree api and it consumes less memory.
                // LCOV_EXCL_START
                JsonArray::parse(
                    path,
                    [&](nlohmann::json&& item, const size_t itemId)
                    {
                        committedInLastIteration = false;
                        orchestration(item, feedTransaction.get());

                        // Extract the offset from the last element.
                        currentOffset = item.at("offset");

                        // Commit the transaction for every 200 elements.
                        if (itemId % OFFSET_TRANSACTION_SIZE == 0)
                        {
                            // Commit the transaction.
                            feedTransaction->commit();

                            // Update the offset in the content manager.
                            contentManagerUpdateOffset(topicName, currentOffset);

                            // Close transaction before reopen the DB to avoid unexpected behavior when the DB is
                            // closed.
                            feedTransaction.reset();

                            // Reinitialize the database to avoid high memory consumption, this is because after update
                            // some key during this process the database will consume more memory.
                            m_feedDatabase->reopen();

                            feedTransaction = m_feedDatabase->createTransaction();
                            committedInLastIteration = true;
                        }
                        return !m_shouldStop.load();
                    },
                    jsonPointer);
                // LCOV_EXCL_STOP

                // If the module is stopped, we do not commit the transaction and update the offset,
                // so that the next time the module is started, it will start from the last offset committed.
                if (m_shouldStop.load())
                {
                    break;
                }
                else
                {
                    if (!committedInLastIteration)
                    {
                        // Commit the transaction for the remaining elements.
                        feedTransaction->commit();

                        // If some data was processed, update the offset.
                        contentManagerUpdateOffset(topicName, currentOffset);
                    }

                    // Close transaction before reopen the DB to avoid unexpected behavior when the DB is closed.
                    feedTransaction.reset();

                    // Reinitialize the database to avoid high memory consumption, this is because after update some
                    // key during this process the database will consume more memory.
                    m_feedDatabase->reopen();
                }
            }

            // Verify vendor-map and oscpe-map values and update the maps in memory
            reloadGlobalMaps();
        }
        else if (parsedMessage.at("type") == "raw")
        {
            // The message contains a list of files to process.
            // For the case of the raw message, the list of files is always one, because it uses the consolidated file.
            if (parsedMessage.at("paths").size() != 1)
            {
                throw std::runtime_error("Invalid message");
            }
            // Delete all the data in the database, because the raw message contains all and latest data.
            m_feedDatabase->deleteAll();

            for (const auto& path : parsedMessage.at("paths"))
            {
                logInfo(WM_VULNSCAN_LOGTAG, "Processing file: %s", path.get_ref<const std::string&>().c_str());
                std::ifstream file(path.get_ref<const std::string&>());
                if (!file.is_open())
                {
                    throw std::runtime_error("Unable to open input file: " + path.get_ref<const std::string&>());
                }

                std::string line;
                int32_t step = 0;
                // Parse the file and execute the chain/orchestration for each valid resource.
                // It orchestrate line by line.
                while (std::getline(file, line))
                {
                    if (m_shouldStop.load())
                    {
                        break;
                    }
                    // Acquiring exclusive access to write new data
                    std::lock_guard<std::shared_mutex> lock(m_mutex);

                    if (step++ % 1000 == 0)
                    {
                        logDebug2(WM_VULNSCAN_LOGTAG, "Processing line: %d", step);
                        // Flush the database every 1000 lines, we don't need to reopen the database, because all of the
                        // elements are inserted.
                        m_feedDatabase->flush();
                    }

                    nlohmann::json parsedLine = nlohmann::json::parse(line, nullptr, false);
                    if (parsedLine.is_discarded() || !parsedLine.contains("name"))
                    {
                        throw std::runtime_error("Invalid line. file: " + path.get_ref<const std::string&>());
                    }

                    parsedLine["resource"] = parsedLine["name"];
                    parsedLine["type"] = "create";

                    orchestration(parsedLine, m_feedDatabase.get());
                }
            }

            // If the module is stopped, we do not update the offset.
            // So that the next time the module is started, it will start from zero.
            if (!m_shouldStop.load())
            {
                m_feedDatabase->flush();

                // Update the offset.
                contentManagerUpdateOffset(topicName, parsedMessage.at("offset"));
            }

            // Verify vendor-map and oscpe-map values and update the maps in memory
            reloadGlobalMaps();
        }
        else
        {
            throw std::runtime_error("Invalid message");
        }
    }

    /**
     * @brief Class constructor.
     *
     * @param indexerConnector Indexer connector.
     * @param shouldStop Variable to control the graceful shutdown of the module.
     * @param mutex Mutex to protect the access to the internal databases.
     * @param isLocalSubscriber Configures the router subscription lambda execution as local or remote.
     * @param reloadGlobalMapsStartup If true, the vendor and os cpe maps will be reloaded at startup.
     * @param initContentUpdater If true, the content updater will be initialized.
     */
    // TODO: Remove LCOV flags once the implementation of the 'Indexer Connector' module is completed
    // LCOV_EXCL_START
    explicit TDatabaseFeedManager(std::shared_ptr<TIndexerConnector> indexerConnector,
                                  const std::atomic<bool>& shouldStop,
                                  std::shared_mutex& mutex,
                                  const bool isLocalSubscriber = true,
                                  const bool reloadGlobalMapsStartup = true,
                                  const bool initContentUpdater = true)
        : Observer("database_feed_manager")
        , m_indexerConnector(std::move(indexerConnector))
        , m_shouldStop(shouldStop)
        , m_mutex(mutex)
    {
        const auto updaterPolicy = TPolicyManager::instance().getUpdaterConfiguration();
        const std::string topicName = updaterPolicy.at("topicName");

        m_feedDatabase = std::make_unique<TRocksDBWrapper>(DATABASE_PATH, false);

        m_translationsCache =
            std::make_unique<LRUCache<std::string, std::string>>(TPolicyManager::instance().getTranslationLRUSize());

        // This initializes the vendor and os cpe maps and should be called before any scan or message processing.
        if (reloadGlobalMapsStartup)
        {
            reloadGlobalMaps();
        }

        // Subscription to vulnerability detector content update events.
        m_contentUpdateSubscription =
            std::make_unique<TRouterSubscriber>(topicName, "vulnerability_feed_manager", isLocalSubscriber);

        m_contentUpdateSubscription->subscribe(
            [&, topicName](const std::vector<char>& message)
            {
                auto eventDecoder = std::make_shared<EventDecoder>();
                eventDecoder->setLast(std::make_shared<StoreModel>());
                eventDecoder->setLast(std::make_shared<FeedIndexer<TIndexerConnector>>(m_indexerConnector));
                eventDecoder->setLast(std::make_shared<ScanDispatcher>());

                auto orchestrationLambda = [&](const nlohmann::json& resource, Utils::IRocksDBWrapper* feedDatabaseArg)
                {
                    auto eventContext =
                        std::make_shared<EventContext>(EventContext {.message = message,
                                                                     .resource = resource,
                                                                     .feedDatabase = feedDatabaseArg,
                                                                     .resourceType = ResourceType::UNKNOWN});
                    eventDecoder->handleRequest(std::move(eventContext));
                };
                try
                {
                    logInfo(WM_VULNSCAN_LOGTAG, "Processing message");
                    processMessage(message, topicName, orchestrationLambda);
                    logInfo(WM_VULNSCAN_LOGTAG, "Message processed");

                    // TODO: If the databases are compressed, they weigh 2GB,
                    //  but when generating the DEB and RPM package, the package
                    //  goes from 165MB to 465MB.
                    //  If the databases are not compressed, they weigh 4.5GB,
                    //  but when generating the DEB and RPM package,
                    //  the package goes from 165MB to 270MB and 350MB respectively.
                    //  Find another compression algorithm.
                    //
                    // Compact the database
                    // logInfo(WM_VULNSCAN_LOGTAG, "Compacting CVE5 database");
                    // m_cvesDatabase->compactDatabaseUsingBzip2();
                    // logInfo(WM_VULNSCAN_LOGTAG, "CVE5 database compacted successfully");
                    // logInfo(WM_VULNSCAN_LOGTAG, "Compacting description, remediation, translation
                    // and feed databases"); m_descriptionsDatabase->compactDatabase();
                    // m_remediationsDatabase->compactDatabase();
                    // m_feedDatabase->compactDatabase();
                    // for (const auto& [key, db] : m_feedsDatabases)
                    // {
                    //     db->compactDatabase();
                    // }
                    // logInfo(WM_VULNSCAN_LOGTAG, "Databases compacted successfully");
                }
                catch (const std::exception& e)
                {
                    logError(WM_VULNSCAN_LOGTAG, "Error processing message: %s", e.what());

                    const std::string url = "http://localhost/ondemand/" + topicName + "?offset=0";
                    TUNIXSocketRequest::instance().get(
                        HttpUnixSocketURL(ONDEMAND_SOCK, url),
                        [](const std::string& msg) { std::cout << msg << std::endl; },
                        [](const std::string& msg, const long responseCode)
                        { logError(WM_VULNSCAN_LOGTAG, "%s: %ld", msg.c_str(), responseCode); });
                }
            });

        if (initContentUpdater)
        {
            // Vulnerability content updater initialization.
            m_contentRegistration =
                std::make_unique<TContentRegister>(topicName, TPolicyManager::instance().getUpdaterConfiguration());
        }
    }

    /**
     * @brief Retrieves vulnerability remediation information from the database.
     *
     * This function retrieves remediation information associated with a given CVE ID
     * from the underlying database and stores it in the provided `dtoVulnRemediation`
     * object.
     *
     * @param cveId The CVE ID for which remediation information is requested.
     * @param dtoVulnRemediation A reference to a `FlatbufferDataPair` object
     *        where the retrieved remediation information will be stored.
     *
     * @throws std::runtime_error if the retrieved data from the database is invalid or
     *         not in the expected FlatBuffers format.
     */
    void getVulnerabilityRemediation(const std::string& cveId, FlatbufferDataPair<RemediationInfo>& dtoVulnRemediation)
    {
        auto result = m_feedDatabase->get(cveId, dtoVulnRemediation.slice, REMEDIATIONS_COLUMN);

        // If the remediation information is not found in the database, we return because there is no remediation.
        if (!result)
        {
            return;
        }

        flatbuffers::Verifier verifier(reinterpret_cast<const uint8_t*>(dtoVulnRemediation.slice.data()),
                                       dtoVulnRemediation.slice.size());
        if (!VerifyRemediationInfoBuffer(verifier))
        {
            throw std::runtime_error("Error: Invalid FlatBuffers data in RocksDB.");
        }

        dtoVulnRemediation.data = GetRemediationInfo(reinterpret_cast<const uint8_t*>(dtoVulnRemediation.slice.data()));
    }

    /**
     * @brief Retrieves a translation for a given package name using the Wazuh package translation system.
     *
     * This function first checks if the translation is available in a cache, and if not, it looks up the translation
     * in a database and caches the result for future use. If a translation is found, the provided callback function
     * is called for each translation entry.
     *
     * @param packageName The name of the package for which translation is needed.
     * @param callback A callback function that will be invoked for each translation entry found.
     *                The callback function should take a single argument of type
     * `NSVulnerabilityScanner::TranslationEntry`.
     *
     * @note This function assumes the existence of several member variables like `m_translationsCache`,
     *       `m_translationsDatabase`, and the necessary data structures and methods, which are used for caching
     *       and querying package translations.
     */
    void
    getWazuhPackageTranslation(const std::string& packageName,
                               const std::function<void(const NSVulnerabilityScanner::TranslationEntry&)>& callback)
    {
        auto resultCache = m_translationsCache->getValue(packageName);

        // If the translation is in the cache, use it to look up the translation in the database.
        if (resultCache.has_value())
        {
            FlatbufferDataPair<TranslationEntry> dtoVulnTranslation;

            auto resultQuery = m_feedDatabase->get(resultCache.value(), dtoVulnTranslation.slice, TRANSLATIONS_COLUMN);
            if (!resultQuery)
            {
                throw std::runtime_error("Value for key " + resultCache.value() + " does not exist in database.");
            }

            flatbuffers::Verifier verifier(reinterpret_cast<const uint8_t*>(dtoVulnTranslation.slice.data()),
                                           dtoVulnTranslation.slice.size());
            if (!VerifyTranslationEntryBuffer(verifier))
            {
                throw std::runtime_error("Error: Invalid FlatBuffers data in RocksDB.");
            }

            dtoVulnTranslation.data =
                GetTranslationEntry(reinterpret_cast<const uint8_t*>(dtoVulnTranslation.slice.data()));

            callback(*dtoVulnTranslation.data);
        }

        // If the translation is not in the cache, iterate over all the translations in the database and look for
        // a match.
        for (const auto& [key, value] : m_feedDatabase->begin(TRANSLATIONS_COLUMN))
        {
            flatbuffers::Verifier verifier(reinterpret_cast<const uint8_t*>(value.data()), value.size());
            if (!VerifyTranslationEntryBuffer(verifier))
            {
                throw std::runtime_error("Error: Invalid FlatBuffers data in RocksDB.");
            }

            auto queryData = GetTranslationEntry(reinterpret_cast<const uint8_t*>(value.data()));
            std::regex product_pattern {queryData->source()->product()->str()};
            if (std::regex_match(packageName, product_pattern))
            {
                callback(*queryData);

                m_translationsCache->insertKey(packageName, key);
                break;
            }
        }
    }

    /**
     * @brief Get the Vulnerabilities Candidates information.
     *
     * @param cnaName RocksDB table identifier.
     * @param packageName Package name.
     * @param callback Store vulnerability data.
     */
    void getVulnerabilitiesCandidates(
        const std::string& cnaName,
        std::string_view packageName,
        const std::function<bool(const std::string& cnaName,
                                 const NSVulnerabilityScanner::ScanVulnerabilityCandidate&)>& callback)
    {
        if (packageName.empty() || cnaName.empty())
        {
            throw std::runtime_error("Invalid package/cna name.");
        }

        std::string packageNameWithSeparator;
        packageNameWithSeparator.append(packageName);
        packageNameWithSeparator.append("_CVE");

        for (const auto& [key, value] : m_feedDatabase->seek(packageNameWithSeparator, cnaName))
        {
            flatbuffers::Verifier verifier(reinterpret_cast<const uint8_t*>(value.data()), value.size());
            if (!NSVulnerabilityScanner::VerifyScanVulnerabilityCandidateArrayBuffer(verifier))
            {
                throw std::runtime_error(
                    "Error getting ScanVulnerabilityCandidateArray object from rocksdb. FlatBuffers verifier failed");
            }

            auto candidatesArray = GetScanVulnerabilityCandidateArray(reinterpret_cast<const uint8_t*>(value.data()));

            if (candidatesArray)
            {
                for (const auto& candidate : *candidatesArray->candidates())
                {
                    if (callback(cnaName, *candidate))
                    {
                        // If the candidate is vulnerable, we stop looking for.
                        break;
                    }
                }
            }
        }
    }

    /**
     * @brief Retrieves a reference to the CVE (Common Vulnerabilities and Exposures) database.
     *
     * This function provides access to the Common Vulnerabilities and Exposures (CVE) database
     * represented by a reference to a RocksDBWrapper object.
     *
     * @return A reference to the CVE database represented by TRocksDBWrapper.
     */
    TRocksDBWrapper& getCVEDatabase()
    {
        return *m_feedDatabase;
    }

    /**
     * @brief Updates scheduler interval.
     *
     * @param data Data containing the interval.
     */
    void update(nlohmann::json& data) override
    {
        if (m_contentRegistration)
        {
            m_contentRegistration->changeSchedulerInterval(data.at("updater").at("interval").get<size_t>());
        }
    }
    // LCOV_EXCL_STOP

    /**
     * @brief Gets descriptive information for a cveid.
     *
     * @param cveId cveid to search.
     * @param resultContainer container struct to store the result.
     */
    void getVulnerabiltyDescriptiveInformation(const std::string_view cveId,
                                               FlatbufferDataPair<VulnerabilityDescription>& resultContainer)
    {
        if (m_feedDatabase->get(std::string(cveId), resultContainer.slice, DESCRIPTIONS_COLUMN) == false)
        {
            throw std::runtime_error(
                "Error getting VulnerabilityDescription object from rocksdb. Object not found for cveId: " +
                std::string(cveId));
        }

        flatbuffers::Verifier verifier(reinterpret_cast<const uint8_t*>(resultContainer.slice.data()),
                                       resultContainer.slice.size());
        if (NSVulnerabilityScanner::VerifyVulnerabilityDescriptionBuffer(verifier) == false)
        {
            throw std::runtime_error(
                "Error getting VulnerabilityDescription object from rocksdb. FlatBuffers verifier failed");
        }

        resultContainer.data = const_cast<NSVulnerabilityScanner::VulnerabilityDescription*>(
            NSVulnerabilityScanner::GetVulnerabilityDescription(resultContainer.slice.data()));
    }

    /**
     * @brief Get CNA/ADP name based on the package format.
     * @param format Package format.
     * @return CNA/ADP name. Empty string otherwise.
     */
    std::string getCnaNameByFormat(std::string_view format)
    {
        const auto& vendorMap = GlobalData::instance().vendorMaps();

        if (vendorMap.contains("format"))
        {
            for (const auto& item : vendorMap.at("format"))
            {
                if (format == item.begin().key())
                {
                    return item.begin().value();
                }
            }
        }

        return {};
    }

    /**
     * @brief Get CNA/ADP name based on the package vendor when it contains a specific word.
     * @param vendor Package vendor.
     * @return CNA/ADP name. Empty string otherwise.
     */
    std::string getCnaNameByContains(std::string_view vendor)
    {
        const auto& vendorMap = GlobalData::instance().vendorMaps();

        if (vendorMap.contains("contains"))
        {
            for (const auto& item : vendorMap.at("contains"))
            {
                if (vendor.find(item.begin().key()) != std::string::npos)
                {
                    return item.begin().value();
                }
            }
        }

        return {};
    }

    /**
     * @brief Get CNA/ADP name based on the package vendor when it starts with a specific word.
     * @param vendor Package vendor.
     * @return CNA/ADP name. Empty string otherwise.
     */
    std::string getCnaNameByPrefix(std::string_view vendor)
    {
        const auto& vendorMap = GlobalData::instance().vendorMaps();
        if (vendorMap.contains("prefix"))
        {
            for (const auto& item : vendorMap.at("prefix"))
            {
                if (Utils::startsWith(vendor.data(), item.begin().key()))
                {
                    return item.begin().value();
                }
            }
        }

        return {};
    }

private:
    /**
     * Do not change the order of definition of these variables.
     * Since it is important at the object destruction time.
     */
    std::shared_mutex& m_mutex;
    std::shared_ptr<TIndexerConnector> m_indexerConnector;
    std::unique_ptr<TContentRegister> m_contentRegistration;
    std::unique_ptr<TRocksDBWrapper> m_feedDatabase;
    std::unique_ptr<LRUCache<std::string, std::string>> m_translationsCache;
    std::unique_ptr<TRouterSubscriber> m_contentUpdateSubscription;
    const std::atomic<bool>& m_shouldStop;

    void contentManagerUpdateOffset(const std::string& topicName, const long long currentOffset)
    {
        nlohmann::json data;
        data["offset"] = currentOffset;
        data["topicName"] = topicName;

        // Exclude from coverage because need to be tested in integration tests.
        // LCOV_EXCL_START
        TUNIXSocketRequest::instance().put(
            HttpUnixSocketURL(ONDEMAND_SOCK, "http://localhost/offset"),
            data,
            [](const std::string& msg) {},
            [](const std::string& msg, const long responseCode)
            { throw std::runtime_error("Error updating offset: " + msg); });
        // LCOV_EXCL_STOP
    }

    /**
     * @brief Reads the vendor and os cpe maps from the database and loads the data into memory.
     *
     * @throws std::runtime_error if the vendor and os cpe maps aren't available or are invalid.
     * @note This methods locks the mutex.
     */
    void reloadGlobalMaps()
    {
        std::scoped_lock<std::shared_mutex> lock(m_mutex);

        std::string result;
        if (!m_feedDatabase->get("FEED-GLOBAL", result, VENDOR_MAP_COLUMN))
        {
            throw std::runtime_error("Vendor map can not be found in DB.");
        }
        else if (result.empty())
        {
            throw std::runtime_error("Vendor map is empty.");
        }

        GlobalData::instance().vendorMaps(nlohmann::json::parse(result));

        rocksdb::PinnableSlice queryResult;
        if (!m_feedDatabase->get("OSCPE-GLOBAL", queryResult, OS_CPE_RULES_COLUMN))
        {
            throw std::runtime_error("Error getting OS CPE rules content from rocksdb.");
        }
        GlobalData::instance().osCpeMaps(nlohmann::json::parse(queryResult.ToString()));
    }
};

using DatabaseFeedManager = TDatabaseFeedManager<>;

#endif // _DATABASE_FEED_MANAGER_HPP
